All files / platform_browser webchannel_connection.ts

86.26% Statements 113/131
61.29% Branches 19/31
90% Functions 18/20
86.05% Lines 111/129
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367                                2x                   2x   2x       2x 2x 2x 2x       2x   2x 2x     2x               2x   2x   2x           442x 442x 442x 442x             2x 620x             620x     620x         216x         216x   216x   216x 216x 216x 216x   180x 180x 180x 180x               36x 36x             36x 36x                           36x                           216x 216x       216x 216x           216x   216x   216x         2x             84x     2x       404x               404x 404x                                 404x 404x 404x   404x             404x         404x   404x   1313x 1313x 398x 398x 398x   1313x 1313x         395x             404x           1616x 3160x 3160x                 404x 397x 397x       404x 396x 389x 389x 389x       404x                                   404x     2367x 2367x 2367x             2367x 2367x 7x   7x 7x 7x 7x                 7x 7x 7x   2360x 2360x           404x         404x   404x       2x 217x 217x 217x 217x 217x   217x 217x 217x   217x 217x 217x   2x  
/**
 * Copyright 2017 Google Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
 
import {
  ErrorCode,
  EventType,
  WebChannel,
  XhrIoPool,
  createWebChannelTransport
} from '@firebase/webchannel-wrapper';
 
import { Token } from '../api/credentials';
import { DatabaseId, DatabaseInfo } from '../core/database_info';
import { SDK_VERSION } from '../core/version';
import { Connection, Stream } from '../remote/connection';
import {
  mapCodeFromHttpStatus,
  mapCodeFromRpcStatus
} from '../remote/rpc_error';
import { StreamBridge } from '../remote/stream_bridge';
import { assert, fail } from '../util/assert';
import { Code, FirestoreError } from '../util/error';
import * as log from '../util/log';
import { Rejecter, Resolver } from '../util/promise';
import { StringMap } from '../util/types';
 
const LOG_TAG = 'Connection';
 
const RPC_STREAM_SERVICE = 'google.firestore.v1beta1.Firestore';
const RPC_URL_VERSION = 'v1beta1';
 
/** Maps RPC names to the corresponding REST endpoint name. */
const RPC_NAME_REST_MAPPING = {
  BatchGetDocuments: 'batchGet',
  Commit: 'commit'
};
 
// TODO(b/38203344): The SDK_VERSION is set independently from Firebase because
// we are doing out-of-band releases. Once we release as part of Firebase, we
// should use the Firebase version instead.
const X_GOOG_API_CLIENT_VALUE = 'gl-js/ fire/' + SDK_VERSION;
 
const XHR_TIMEOUT_SECS = 15;
 
export class WebChannelConnection implements Connection {
  private readonly databaseId: DatabaseId;
  private readonly baseUrl: string;
  private readonly pool: XhrIoPool;
 
  constructor(info: DatabaseInfo) {
    this.databaseId = info.databaseId;
    this.pool = new XhrIoPool();
    const proto = info.ssl ? 'https' : 'http';
    this.baseUrl = proto + '://' + info.host;
  }
 
  /**
   * Modifies the headers for a request, adding any authorization token if
   * present and any additional headers for the request.
   */
  private modifyHeadersForRequest(headers: StringMap, token: Token | null) {
    Iif (token) {
      for (const header in token.authHeaders) {
        if (token.authHeaders.hasOwnProperty(header)) {
          headers[header] = token.authHeaders[header];
        }
      }
    }
    headers['X-Goog-Api-Client'] = X_GOOG_API_CLIENT_VALUE;
    // This header is used to improve routing and project isolation by the
    // backend.
    headers['google-cloud-resource-prefix'] =
      `projects/${this.databaseId.projectId}/` +
      `databases/${this.databaseId.database}`;
  }
 
  invokeRPC<Req, Resp>(
    rpcName: string,
    request: Req,
    token: Token | null
  ): Promise<Resp> {
    const url = this.makeUrl(rpcName);
 
    return new Promise((resolve: Resolver<Resp>, reject: Rejecter) => {
      // tslint:disable-next-line:no-any XhrIoPool doesn't have TS typings.
      this.pool.getObject((xhr: any) => {
        xhr.listenOnce(EventType.COMPLETE, () => {
          try {
            switch (xhr.getLastErrorCode()) {
              case ErrorCode.NO_ERROR:
                const json = xhr.getResponseJson() as Resp;
                log.debug(LOG_TAG, 'XHR received:', JSON.stringify(json));
                resolve(json);
                break;
              case ErrorCode.TIMEOUT:
                log.debug(LOG_TAG, 'RPC "' + rpcName + '" timed out');
                reject(
                  new FirestoreError(Code.DEADLINE_EXCEEDED, 'Request time out')
                );
                break;
              case ErrorCode.HTTP_ERROR:
                const status = xhr.getStatus();
                log.debug(
                  LOG_TAG,
                  'RPC "' + rpcName + '" failed with status:',
                  status,
                  'response text:',
                  xhr.getResponseText()
                );
                Eif (status > 0) {
                  reject(
                    new FirestoreError(
                      mapCodeFromHttpStatus(status),
                      'Server responded with status ' + xhr.getStatusText()
                    )
                  );
                } else {
                  // If we received an HTTP_ERROR but there's no status code,
                  // it's most probably a connection issue
                  log.debug(LOG_TAG, 'RPC "' + rpcName + '" failed');
                  reject(
                    new FirestoreError(Code.UNAVAILABLE, 'Connection failed.')
                  );
                }
                break;
              default:
                fail(
                  'RPC "' +
                    rpcName +
                    '" failed with unanticipated ' +
                    'webchannel error ' +
                    xhr.getLastErrorCode() +
                    ': ' +
                    xhr.getLastError() +
                    ', giving up.'
                );
            }
          } finally {
            log.debug(LOG_TAG, 'RPC "' + rpcName + '" completed.');
            this.pool.releaseObject(xhr);
          }
        });
 
        const requestString = JSON.stringify(request);
        log.debug(LOG_TAG, 'XHR sending: ', url + ' ' + requestString);
        // Content-Type: text/plain will avoid preflight requests which might
        // mess with CORS and redirects by proxies. If we add custom headers
        // we will need to change this code to potentially use the
        // $httpOverwrite parameter supported by ESF to avoid
        // triggering preflight requests.
        const headers: StringMap = { 'Content-Type': 'text/plain' };
 
        this.modifyHeadersForRequest(headers, token);
 
        xhr.send(url, 'POST', requestString, headers, XHR_TIMEOUT_SECS);
      });
    });
  }
 
  invokeStreamingRPC<Req, Resp>(
    rpcName: string,
    request: Req,
    token: Token | null
  ): Promise<Resp[]> {
    // The REST API automatically aggregates all of the streamed results, so we
    // can just use the normal invoke() method.
    return this.invokeRPC<Req, Resp[]>(rpcName, request, token);
  }
 
  openStream<Req, Resp>(
    rpcName: string,
    token: Token | null
  ): Stream<Req, Resp> {
    const urlParts = [
      this.baseUrl,
      '/',
      RPC_STREAM_SERVICE,
      '/',
      rpcName,
      '/channel'
    ];
    const webchannelTransport = createWebChannelTransport();
    const request = {
      // Background channel test avoids the initial two test calls and decreases
      // initial cold start time.
      // TODO(dimond): wenboz@ mentioned this might affect use with proxies and
      // we should monitor closely for any reports.
      backgroundChannelTest: true,
      // Required for backend stickiness, routing behavior is based on this
      // parameter.
      httpSessionIdParam: 'gsessionid',
      initMessageHeaders: {},
      // Send our custom headers as a '$httpHeaders=' url param to avoid CORS
      // preflight round-trip. This is formally defined here:
      // https://github.com/google/closure-library/blob/b0e1815b13fb92a46d7c9b3c30de5d6a396a3245/closure/goog/net/rpc/httpcors.js#L40
      httpHeadersOverwriteParam: '$httpHeaders',
      sendRawJson: true,
      supportsCrossDomainXhr: true
    };
    this.modifyHeadersForRequest(request.initMessageHeaders, token);
    const url = urlParts.join('');
    log.debug(LOG_TAG, 'Creating WebChannel: ' + url + ' ' + request);
    // tslint:disable-next-line:no-any Because listen isn't defined on it.
    const channel = webchannelTransport.createWebChannel(url, request) as any;
 
    // WebChannel supports sending the first message with the handshake - saving
    // a network round trip. However, it will have to call send in the same
    // JS event loop as open. In order to enforce this, we delay actually
    // opening the WebChannel until send is called. Whether we have called
    // open is tracked with this variable.
    let opened = false;
 
    // A flag to determine whether the stream was closed (by us or through an
    // error/close event) to avoid delivering multiple close events or sending
    // on a closed stream
    let closed = false;
 
    const streamBridge = new StreamBridge<Req, Resp>({
      sendFn: (msg: Req) => {
        Eif (!closed) {
          if (!opened) {
            log.debug(LOG_TAG, 'Opening WebChannel transport.');
            channel.open();
            opened = true;
          }
          log.debug(LOG_TAG, 'WebChannel sending:', msg);
          channel.send(msg);
        } else {
          log.debug(LOG_TAG, 'Not sending because WebChannel is closed:', msg);
        }
      },
      closeFn: () => channel.close()
    });
 
    // Closure events are guarded and exceptions are swallowed, so catch any
    // exception and rethrow using a setTimeout so they become visible again.
    // Note that eventually this function could go away if we are confident
    // enough the code is exception free.
    const unguardedEventListen = <T>(
      type: WebChannel.EventType,
      fn: (param?: T) => void
    ) => {
      // TODO(dimond): closure typing seems broken because WebChannel does
      // not implement goog.events.Listenable
      channel.listen(type, (param?: T) => {
        try {
          fn(param);
        } catch (e) {
          setTimeout(() => {
            throw e;
          }, 0);
        }
      });
    };
 
    unguardedEventListen(WebChannel.EventType.OPEN, () => {
      Eif (!closed) {
        log.debug(LOG_TAG, 'WebChannel transport opened.');
      }
    });
 
    unguardedEventListen(WebChannel.EventType.CLOSE, () => {
      if (!closed) {
        closed = true;
        log.debug(LOG_TAG, 'WebChannel transport closed');
        streamBridge.callOnClose();
      }
    });
 
    unguardedEventListen<Error>(WebChannel.EventType.ERROR, err => {
      if (!closed) {
        closed = true;
        log.debug(LOG_TAG, 'WebChannel transport errored:', err);
        streamBridge.callOnClose(
          new FirestoreError(
            Code.UNAVAILABLE,
            'The operation could not be completed'
          )
        );
      }
    });
 
    // WebChannel delivers message events as array. If batching is not enabled
    // (it's off by default) each message will be delivered alone, resulting in
    // a single element array.
    type WebChannelResponse = { data: Resp[] };
 
    unguardedEventListen<WebChannelResponse>(
      WebChannel.EventType.MESSAGE,
      msg => {
        Eif (!closed) {
          const msgData = msg.data[0];
          assert(!!msgData, 'Got a webchannel message without data.');
          // TODO(b/35143891): There is a bug in One Platform that caused errors
          // (and only errors) to be wrapped in an extra array. To be forward
          // compatible with the bug we need to check either condition. The latter
          // can be removed once the fix has been rolled out.
          const error =
            // tslint:disable-next-line:no-any msgData.error is not typed.
            (msgData as any).error || (msgData[0] && msgData[0].error);
          if (error) {
            log.debug(LOG_TAG, 'WebChannel received error:', error);
            // error.status will be a string like 'OK' or 'NOT_FOUND'.
            const status: string = error.status;
            let code = mapCodeFromRpcStatus(status);
            let message = error.message;
            Iif (code === undefined) {
              code = Code.INTERNAL;
              message =
                'Unknown error status: ' +
                status +
                ' with message ' +
                error.message;
            }
            // Mark closed so no further events are propagated
            closed = true;
            streamBridge.callOnClose(new FirestoreError(code, message));
            channel.close();
          } else {
            log.debug(LOG_TAG, 'WebChannel received:', msgData);
            streamBridge.callOnMessage(msgData);
          }
        }
      }
    );
 
    setTimeout(() => {
      // Technically we could/should wait for the WebChannel opened event,
      // but because we want to send the first message with the WebChannel
      // handshake we pretend the channel opened here (asynchronously), and
      // then delay the actual open until the first message is sent.
      streamBridge.callOnOpen();
    }, 0);
    return streamBridge;
  }
 
  // visible for testing
  makeUrl(rpcName: string): string {
    const urlRpcName = RPC_NAME_REST_MAPPING[rpcName];
    assert(urlRpcName !== undefined, 'Unknown REST mapping for: ' + rpcName);
    const url = [this.baseUrl, '/', RPC_URL_VERSION];
    url.push('/projects/');
    url.push(this.databaseId.projectId);
 
    url.push('/databases/');
    url.push(this.databaseId.database);
    url.push('/documents');
 
    url.push(':');
    url.push(urlRpcName);
    return url.join('');
  }
}